{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a href=\"https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/workflow/router_query_engine.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Router Query Engine\n",
    "\n",
    "`RouterQueryEngine` chooses the most appropriate query engine from multiple options to process a given query.\n",
    "\n",
    "This notebook walks through implementation of Router Query Engine, using workflows.\n",
    "\n",
    "Specifically we will implement [RouterQueryEngine](https://docs.llamaindex.ai/en/stable/examples/query_engine/routerqueryengine/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -U llama-index"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = \"sk-..\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since workflows are async first, this all runs fine in a notebook. If you were running in your own code, you would want to use `asyncio.run()` to start an async event loop if one isn't already running.\n",
    "\n",
    "```python\n",
    "async def main():\n",
    "    <async code>\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    import asyncio\n",
    "    asyncio.run(main())\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Events"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import Event\n",
    "from llama_index.core.base.base_selector import SelectorResult\n",
    "from typing import Dict, List, Any\n",
    "from llama_index.core.base.response.schema import RESPONSE_TYPE\n",
    "\n",
    "\n",
    "class QueryEngineSelectionEvent(Event):\n",
    "    \"\"\"Result of selecting the query engine tools.\"\"\"\n",
    "\n",
    "    selected_query_engines: SelectorResult\n",
    "\n",
    "\n",
    "class SynthesizeEvent(Event):\n",
    "    \"\"\"Event for synthesizing the response from different query engines.\"\"\"\n",
    "\n",
    "    result: List[RESPONSE_TYPE]\n",
    "    selected_query_engines: SelectorResult"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The Workflow\n",
    "\n",
    "`selector:`\n",
    "\n",
    "1. It takes a StartEvent as input and returns a QueryEngineSelectionEvent.\n",
    "2. The `LLMSingleSelector`/ `PydanticSingleSelector`/ `PydanticMultiSelector` will select one/ multiple query engine tools.\n",
    "\n",
    "`generate_responses:`\n",
    "\n",
    "This function uses the selected query engines to generate responses and returns SynthesizeEvent.\n",
    "\n",
    "`synthesize_responses:`\n",
    "\n",
    "This function combines the generated responses and synthesizes the final response if multiple query engines are selected otherwise returns the single generated response.\n",
    "\n",
    "\n",
    "The steps will use the built-in `StartEvent` and `StopEvent` events.\n",
    "\n",
    "With our events defined, we can construct our workflow and steps. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import (\n",
    "    Context,\n",
    "    Workflow,\n",
    "    StartEvent,\n",
    "    StopEvent,\n",
    "    step,\n",
    ")\n",
    "\n",
    "from llama_index.llms.openai import OpenAI\n",
    "from llama_index.core.selectors.utils import get_selector_from_llm\n",
    "from llama_index.core.base.response.schema import (\n",
    "    PydanticResponse,\n",
    "    Response,\n",
    "    AsyncStreamingResponse,\n",
    ")\n",
    "from llama_index.core.bridge.pydantic import BaseModel\n",
    "from llama_index.core.response_synthesizers import TreeSummarize\n",
    "from llama_index.core.schema import QueryBundle\n",
    "from llama_index.core import Settings\n",
    "\n",
    "from IPython.display import Markdown, display\n",
    "import asyncio\n",
    "\n",
    "\n",
    "class RouterQueryEngineWorkflow(Workflow):\n",
    "    @step\n",
    "    async def selector(\n",
    "        self, ctx: Context, ev: StartEvent\n",
    "    ) -> QueryEngineSelectionEvent:\n",
    "        \"\"\"\n",
    "        Selects a single/ multiple query engines based on the query.\n",
    "        \"\"\"\n",
    "\n",
    "        await ctx.store.set(\"query\", ev.get(\"query\"))\n",
    "        await ctx.store.set(\"llm\", ev.get(\"llm\"))\n",
    "        await ctx.store.set(\"query_engine_tools\", ev.get(\"query_engine_tools\"))\n",
    "        await ctx.store.set(\"summarizer\", ev.get(\"summarizer\"))\n",
    "\n",
    "        llm = Settings.llm\n",
    "        select_multiple_query_engines = ev.get(\"select_multi\")\n",
    "        query = ev.get(\"query\")\n",
    "        query_engine_tools = ev.get(\"query_engine_tools\")\n",
    "\n",
    "        selector = get_selector_from_llm(\n",
    "            llm, is_multi=select_multiple_query_engines\n",
    "        )\n",
    "\n",
    "        query_engines_metadata = [\n",
    "            query_engine.metadata for query_engine in query_engine_tools\n",
    "        ]\n",
    "\n",
    "        selected_query_engines = await selector.aselect(\n",
    "            query_engines_metadata, query\n",
    "        )\n",
    "\n",
    "        return QueryEngineSelectionEvent(\n",
    "            selected_query_engines=selected_query_engines\n",
    "        )\n",
    "\n",
    "    @step\n",
    "    async def generate_responses(\n",
    "        self, ctx: Context, ev: QueryEngineSelectionEvent\n",
    "    ) -> SynthesizeEvent:\n",
    "        \"\"\"Generate the responses from the selected query engines.\"\"\"\n",
    "\n",
    "        query = await ctx.store.get(\"query\", default=None)\n",
    "        selected_query_engines = ev.selected_query_engines\n",
    "        query_engine_tools = await ctx.store.get(\"query_engine_tools\")\n",
    "\n",
    "        query_engines = [engine.query_engine for engine in query_engine_tools]\n",
    "\n",
    "        print(\n",
    "            f\"number of selected query engines: {len(selected_query_engines.selections)}\"\n",
    "        )\n",
    "\n",
    "        if len(selected_query_engines.selections) > 1:\n",
    "            tasks = []\n",
    "            for selected_query_engine in selected_query_engines.selections:\n",
    "                print(\n",
    "                    f\"Selected query engine: {selected_query_engine.index}: {selected_query_engine.reason}\"\n",
    "                )\n",
    "                query_engine = query_engines[selected_query_engine.index]\n",
    "                tasks.append(query_engine.aquery(query))\n",
    "\n",
    "            response_generated = await asyncio.gather(*tasks)\n",
    "\n",
    "        else:\n",
    "            query_engine = query_engines[\n",
    "                selected_query_engines.selections[0].index\n",
    "            ]\n",
    "\n",
    "            print(\n",
    "                f\"Selected query engine: {selected_query_engines.ind}: {selected_query_engines.reason}\"\n",
    "            )\n",
    "\n",
    "            response_generated = [await query_engine.aquery(query)]\n",
    "\n",
    "        return SynthesizeEvent(\n",
    "            result=response_generated,\n",
    "            selected_query_engines=selected_query_engines,\n",
    "        )\n",
    "\n",
    "    async def acombine_responses(\n",
    "        self,\n",
    "        summarizer: TreeSummarize,\n",
    "        responses: List[RESPONSE_TYPE],\n",
    "        query_bundle: QueryBundle,\n",
    "    ) -> RESPONSE_TYPE:\n",
    "        \"\"\"Async combine multiple response from sub-engines.\"\"\"\n",
    "\n",
    "        print(\"Combining responses from multiple query engines.\")\n",
    "\n",
    "        response_strs = []\n",
    "        source_nodes = []\n",
    "        for response in responses:\n",
    "            if isinstance(\n",
    "                response, (AsyncStreamingResponse, PydanticResponse)\n",
    "            ):\n",
    "                response_obj = await response.aget_response()\n",
    "            else:\n",
    "                response_obj = response\n",
    "            source_nodes.extend(response_obj.source_nodes)\n",
    "            response_strs.append(str(response))\n",
    "\n",
    "        summary = await summarizer.aget_response(\n",
    "            query_bundle.query_str, response_strs\n",
    "        )\n",
    "\n",
    "        if isinstance(summary, str):\n",
    "            return Response(response=summary, source_nodes=source_nodes)\n",
    "        elif isinstance(summary, BaseModel):\n",
    "            return PydanticResponse(\n",
    "                response=summary, source_nodes=source_nodes\n",
    "            )\n",
    "        else:\n",
    "            return AsyncStreamingResponse(\n",
    "                response_gen=summary, source_nodes=source_nodes\n",
    "            )\n",
    "\n",
    "    @step\n",
    "    async def synthesize_responses(\n",
    "        self, ctx: Context, ev: SynthesizeEvent\n",
    "    ) -> StopEvent:\n",
    "        \"\"\"Synthesizes the responses from the generated responses.\"\"\"\n",
    "\n",
    "        response_generated = ev.result\n",
    "        query = await ctx.store.get(\"query\", default=None)\n",
    "        summarizer = await ctx.store.get(\"summarizer\")\n",
    "        selected_query_engines = ev.selected_query_engines\n",
    "\n",
    "        if len(response_generated) > 1:\n",
    "            response = await self.acombine_responses(\n",
    "                summarizer, response_generated, QueryBundle(query_str=query)\n",
    "            )\n",
    "        else:\n",
    "            response = response_generated[0]\n",
    "\n",
    "        response.metadata = response.metadata or {}\n",
    "        response.metadata[\"selector_result\"] = selected_query_engines\n",
    "\n",
    "        return StopEvent(result=response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define LLM"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm = OpenAI(model=\"gpt-4o-mini\")\n",
    "Settings.llm = llm"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Summarizer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.prompts.default_prompt_selectors import (\n",
    "    DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,\n",
    ")\n",
    "\n",
    "summarizer = TreeSummarize(\n",
    "    llm=llm,\n",
    "    summary_template=DEFAULT_TREE_SUMMARIZE_PROMPT_SEL,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Download Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "--2024-08-26 22:46:42--  https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt\n",
      "Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8000::154, 2606:50c0:8003::154, 2606:50c0:8002::154, ...\n",
      "Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8000::154|:443... connected.\n",
      "HTTP request sent, awaiting response... 200 OK\n",
      "Length: 75042 (73K) [text/plain]\n",
      "Saving to: ‘data/paul_graham/paul_graham_essay.txt’\n",
      "\n",
      "data/paul_graham/pa 100%[===================>]  73.28K  --.-KB/s    in 0.02s   \n",
      "\n",
      "2024-08-26 22:46:42 (3.82 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "!mkdir -p 'data/paul_graham/'\n",
    "!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import SimpleDirectoryReader\n",
    "\n",
    "documents = SimpleDirectoryReader(\"./data/paul_graham\").load_data()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Nodes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nodes = Settings.node_parser.get_nodes_from_documents(documents)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Indices\n",
    "\n",
    "We will create three indices SummaryIndex, VectorStoreIndex and SimpleKeywordTableIndex."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core import (\n",
    "    VectorStoreIndex,\n",
    "    SummaryIndex,\n",
    "    SimpleKeywordTableIndex,\n",
    ")\n",
    "\n",
    "summary_index = SummaryIndex(nodes)\n",
    "vector_index = VectorStoreIndex(nodes)\n",
    "keyword_index = SimpleKeywordTableIndex(nodes)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Query Engine Tools"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.tools import QueryEngineTool\n",
    "\n",
    "list_query_engine = summary_index.as_query_engine(\n",
    "    response_mode=\"tree_summarize\",\n",
    "    use_async=True,\n",
    ")\n",
    "vector_query_engine = vector_index.as_query_engine()\n",
    "keyword_query_engine = keyword_index.as_query_engine()\n",
    "\n",
    "list_tool = QueryEngineTool.from_defaults(\n",
    "    query_engine=list_query_engine,\n",
    "    description=(\n",
    "        \"Useful for summarization questions related to Paul Graham eassy on\"\n",
    "        \" What I Worked On.\"\n",
    "    ),\n",
    ")\n",
    "\n",
    "vector_tool = QueryEngineTool.from_defaults(\n",
    "    query_engine=vector_query_engine,\n",
    "    description=(\n",
    "        \"Useful for retrieving specific context from Paul Graham essay on What\"\n",
    "        \" I Worked On.\"\n",
    "    ),\n",
    ")\n",
    "\n",
    "keyword_tool = QueryEngineTool.from_defaults(\n",
    "    query_engine=keyword_query_engine,\n",
    "    description=(\n",
    "        \"Useful for retrieving specific context using keywords from Paul\"\n",
    "        \" Graham essay on What I Worked On.\"\n",
    "    ),\n",
    ")\n",
    "\n",
    "query_engine_tools = [list_tool, vector_tool, keyword_tool]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run the Workflow!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nest_asyncio\n",
    "\n",
    "nest_asyncio.apply()\n",
    "\n",
    "w = RouterQueryEngineWorkflow(timeout=200)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Querying"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Summarization Query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "number of selected query engines: 1\n",
      "Selected query engine: 0: This choice directly addresses the need for a summary of the document.\n"
     ]
    },
    {
     "data": {
      "text/markdown": [
       "> Question: Provide the summary of the document?"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/markdown": [
       "Answer: The document recounts the journey of an individual who transitioned from writing and programming in his youth to exploring artificial intelligence and eventually becoming a successful entrepreneur and essayist. Initially drawn to philosophy in college, he found it unfulfilling and shifted his focus to AI, inspired by literature and documentaries. His academic pursuits led him to reverse-engineer a natural language program, but he soon realized the limitations of AI at the time.\n",
       "\n",
       "After completing his PhD, he ventured into the art world, taking classes and painting, while also working on a book about Lisp programming. His experiences in the tech industry, particularly at a software company, shaped his understanding of business dynamics and the importance of being an entry-level option in the market.\n",
       "\n",
       "In the mid-1990s, he co-founded Viaweb, an early web application for building online stores, which was later acquired by Yahoo. Following this, he became involved in angel investing and co-founded Y Combinator, a startup accelerator that revolutionized seed funding by supporting multiple startups simultaneously.\n",
       "\n",
       "The narrative highlights the author's reflections on the nature of work, the significance of pursuing unprestigious projects, and the evolution of his interests from programming to writing essays. He emphasizes the value of independent thinking and the impact of the internet on publishing and entrepreneurship. Ultimately, the document illustrates a life characterized by exploration, creativity, and a commitment to helping others succeed in their ventures."
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# This should use summary query engine/ tool.\n",
    "\n",
    "query = \"Provide the summary of the document?\"\n",
    "\n",
    "result = await w.run(\n",
    "    query=query,\n",
    "    llm=llm,\n",
    "    query_engine_tools=query_engine_tools,\n",
    "    summarizer=summarizer,\n",
    "    select_multi=True,  # You can change it to default it to select only one query engine.\n",
    ")\n",
    "\n",
    "display(\n",
    "    Markdown(\"> Question: {}\".format(query)),\n",
    "    Markdown(\"Answer: {}\".format(result)),\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Pointed Context Query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "number of selected query engines: 1\n",
      "Selected query engine: 1: The question asks for specific context about the author's experiences growing up, which aligns with retrieving specific context from the essay.\n"
     ]
    },
    {
     "data": {
      "text/markdown": [
       "> Question: What did the author do growing up?"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/markdown": [
       "Answer: Growing up, the author focused on writing and programming outside of school. Initially, he wrote short stories, which he later described as lacking in plot but rich in character emotions. He began programming at a young age on an IBM 1401, where he experimented with early Fortran and punch cards. Eventually, he convinced his father to buy a TRS-80 microcomputer, which allowed him to write simple games and a word processor. Despite enjoying programming, he initially planned to study philosophy in college, believing it to be a pursuit of ultimate truths. However, he later switched his focus to artificial intelligence after finding philosophy courses unengaging."
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# This should use vector query engine/ tool.\n",
    "\n",
    "query = \"What did the author do growing up?\"\n",
    "\n",
    "result = await w.run(\n",
    "    query=query,\n",
    "    llm=llm,\n",
    "    query_engine_tools=query_engine_tools,\n",
    "    summarizer=summarizer,\n",
    "    select_multi=False,  # You can change it to select multiple query engines.\n",
    ")\n",
    "\n",
    "display(\n",
    "    Markdown(\"> Question: {}\".format(query)),\n",
    "    Markdown(\"Answer: {}\".format(result)),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "number of selected query engines: 2\n",
      "Selected query engine: 1: This choice is useful for retrieving specific context related to notable events and people from the author's time at Interleaf and YC.\n",
      "Selected query engine: 2: This choice allows for retrieving specific context using keywords, which can help in identifying notable events and people.\n",
      "Combining responses from multiple query engines.\n"
     ]
    },
    {
     "data": {
      "text/markdown": [
       "> Question: What were noteable events and people from the authors time at Interleaf and YC?"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/markdown": [
       "Answer: Notable events during the author's time at Interleaf included the establishment of a large Release Engineering group, which underscored the complexities of software updates and version management. The company also made a significant decision to incorporate a scripting language inspired by Emacs, aimed at attracting Lisp hackers to enhance their software capabilities. The author reflected on this period as the closest they had to a normal job, despite acknowledging their shortcomings as an employee.\n",
       "\n",
       "At Y Combinator (YC), key events included the launch of the first Summer Founders Program, which received 225 applications and funded eight startups, featuring notable figures such as the founders of Reddit, Justin Kan and Emmett Shear (who later founded Twitch), and Aaron Swartz. The program fostered a supportive community among founders and marked a transition for YC from a small initiative to a larger organization. Significant individuals during this time included Jessica Livingston, with whom the author had a close professional and personal relationship, as well as Robert Morris and Trevor Blackwell, who contributed to the development of shopping cart software and were recognized for their programming skills, respectively. Sam Altman, who later became the second president of YC, was also mentioned as a significant figure in this period."
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "# This query could use either a keyword or vector query engine\n",
    "# so it will combine responses from both\n",
    "\n",
    "query = \"What were noteable events and people from the authors time at Interleaf and YC?\"\n",
    "\n",
    "result = await w.run(\n",
    "    query=query,\n",
    "    llm=llm,\n",
    "    query_engine_tools=query_engine_tools,\n",
    "    summarizer=summarizer,\n",
    "    select_multi=True,  # Since query should use two query engine tools, we enabled it.\n",
    ")\n",
    "\n",
    "display(\n",
    "    Markdown(\"> Question: {}\".format(query)),\n",
    "    Markdown(\"Answer: {}\".format(result)),\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llamaindex",
   "language": "python",
   "name": "llamaindex"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
